package mr

import (
	"strconv"
	// "sync/atomic"
	"time"
	"log"
	"net"
	"os"
	"net/rpc"
	"net/http"
	"fmt"
	"sync"
)

var (
	// count uint32 // 已经完成的Map任务数
	// reducefinish uint32 // 已经完成的reduce任务数
	mu sync.Mutex
)

// State 为job的状态
type State int
const (
	IDLE = 0
	INPROGRESS = 1
	COMPLETE = 2
)

type Master struct {
	// Your definitions here.
	mapState map[int]State  // 序号 -》状态
	fileNames []string // 序号 -> 文件名
	mapWorker map[int]int  // 序号 -》workerid
	mapDone bool

	nReduce int // 需要启动的reduce worker数量，也是每个map任务需要写入的文件数量
	reduceState map[int]State // 一个nReduce大小的切片，记录reduce任务完成的情况, 0-未分配，1-已分配，2-已完成
	reduceWorker map[int]int // 任务号 -》 worker
	reduceDone bool

	timers map[int]*time.Timer  // 计时器，映射为workerID到timer
}

// Your code here -- RPC handlers for the worker to call.

// RegisterWorker worker注册，生成定时器goroutine
func (m *Master) RegisterWorker(args *RegistArgs, reply *RegistResp) error {
	mu.Lock()
	defer mu.Unlock()

	workerID := uniqueID()
	reply.WorkerID = workerID
	m.timers[workerID] = time.NewTimer(time.Second * 10)
	log.Printf("Master > new worker %v register", workerID)

	// 为每个worker分配一个定时器线程
	go func(worker int, timer <-chan time.Time) {
		<- timer // 如果定时器超时了
		mu.Lock()
		defer mu.Unlock()
		delete(m.timers, worker) // 删除定时器
		for jobid, wid := range m.mapWorker {
			if wid == worker {
				m.mapState[jobid] = IDLE
				delete(m.mapWorker, jobid)
				log.Printf("Master > map worker %v time out", worker)
			}
		}
		for jobid, wid := range m.reduceWorker {
			if wid == worker {
				m.reduceState[jobid] = IDLE
				delete(m.reduceWorker, jobid)
				log.Printf("Master > reduce worker %v time out", worker)
			}
		}
	}(workerID, m.timers[workerID].C)

	return nil
}

// AcceptWorker PRC响应 接受worker的请求任务请求
func (m *Master) AcceptWorker(args *RequestArgs, reply *ResponseType) error {
	mu.Lock()
	defer mu.Unlock()
	
	reply.NReduce = m.nReduce
	if !m.mapDone { // map未完成
		mapjobid := m.assignMapJob(args.WorkerID)

		if mapjobid == -1 {
			return fmt.Errorf("please apply job again")
		}
		reply.JobType = MAP
		reply.BucketName = m.fileNames[mapjobid]
		reply.TaskNum = mapjobid
		
		m.mapWorker[mapjobid] = args.WorkerID

	} else if !m.reduceDone{
		rdTaskNum := m.assignReduceJob(args.WorkerID)

		if rdTaskNum == -1 {
			reply.NReduce = -1 // 告诉worker不要再申请任务了
			return fmt.Errorf("no job available")
		}
		reply.JobType = REDUCE
		reply.TaskNum = rdTaskNum
	}
	return nil // success assigned
}

// WorkerFinished 回应worker完成工作
// 对于timeout的worker,即使worker完成了任务，由于在master保存的数据结构中找不到对应的worker
func (m *Master) WorkerFinished(args *RequestArgs, reply *ResponseType) error {
	mu.Lock()
	defer mu.Unlock()

	if args.JobType == MAP {
		_, ok := m.mapWorker[args.TaskNum]
		if !ok {
			return fmt.Errorf("Map Worker timeout, job : %v", m.fileNames[args.TaskNum])
		}
		m.mapState[args.TaskNum] = COMPLETE
		delete(m.mapWorker, args.TaskNum)

		fmt.Printf("Map job" + m.fileNames[args.TaskNum] + " finish\n")
	} else if args.JobType == REDUCE {
		_, ok := m.reduceWorker[args.TaskNum]
		if !ok {
			return fmt.Errorf("Reduce worker timeout, job : %v", args.TaskNum)
		}
		m.reduceState[args.TaskNum] = COMPLETE
		delete(m.reduceWorker, args.TaskNum)

		fmt.Printf("Reduce job" + strconv.Itoa(args.TaskNum) + " finish\n")
	}
	return nil
}

// 根据job类型来选择文件名
func (m *Master) assignMapJob(worker int) (job int){
	mapComplete := true
	for jobid, state := range m.mapState {
		if state == IDLE {
			job = jobid
			m.mapState[jobid] = INPROGRESS
			m.mapWorker[jobid] = worker
			return
		}
		if state != COMPLETE {
			mapComplete = false
		}
	}
	if mapComplete {
		m.mapDone = true
		log.Println("map phase compelet")
	}
	return -1
}

func (m *Master) assignReduceJob(worker int) (reduceNum int) {
	reduceComplete := true
	for jobid, state := range m.reduceState {
		if state == IDLE {
			reduceNum = jobid  // 返回该reduce任务的编号，即这个reduce worker要读取mr-X-reduceNum.txt的中间文件
			m.reduceState[jobid] = INPROGRESS
			m.reduceWorker[jobid] = worker
			return
		}
		if state != COMPLETE {
			reduceComplete = false
		}
	}
	if reduceComplete {
		m.reduceDone = true
		log.Println("reduce phase complete")
	}
	return -1
}


//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
	rpc.Register(m)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
	// Your code here.
	mu.Lock()
	defer mu.Unlock()
	return m.reduceDone
	
	// ismap := count < uint32(len(m.fileNames)) // map的任务未完成
	// if ismap {
	// 	fmt.Println("map已完成"+strconv.Itoa(int(count)))
	// } else {
	// 	fmt.Println("reduce已完成"+strconv.Itoa(int(reducefinish)))
	// }
	// for task, t := range m.timers { // 对已经分配出去的任务进行时间管理
	// 	if ismap { // map任务
	// 		if time.Now().Unix() - t >= 10 { // 超过十秒钟worker还未完成
	// 			fmt.Println("map worker timeout" + m.fileNames[task-1])
	// 			m.JobState[m.fileNames[task - 1]] *= -1   // 默认这个map任务未完成
	// 			delete(m.timers, task)
	// 		}
	// 	} else { // reduce任务
	// 		if time.Now().Unix() - t >= 10 { // 超时
	// 			fmt.Println("reduce worker timeout :" + strconv.Itoa(task))
	// 			m.reduceTask[task] = 0 // 默认这个reduce任务未完成
	// 			delete(m.timers, task)
	// 		}
	// 	}
	// }

	// if reducefinish == uint32(m.nReduce) {
	// 	fmt.Println("All reduce and map jobs finished")
	// 	return true
	// }
	// return false
}

//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
	m := Master{
		mapState: map[int]State{},
		mapWorker: map[int]int{},
		fileNames: files,
		mapDone: false,

		nReduce: nReduce, 
		reduceDone: false,
		reduceState: map[int]State{},
		reduceWorker: map[int]int{},

		timers: map[int]*time.Timer{},
	}

	for i := 0; i < len(files); i++ {
		m.mapState[i] = IDLE
	}
	for i := 0; i < nReduce; i++ {
		m.reduceState[i] = IDLE
	}
	// Your code here.

	m.server()
	return &m
}
